Python并发编程标准库concurrent.futures模块

85次阅读
没有评论

共计 1983 个字符,预计需要花费 5 分钟才能阅读完成。

并发:多线程交替操作同一资源类
并行:多线程同时操作多个资源类

concurrent.futures

是 Python 3.2 引入的新模块,它是在 threading 和 multiprocessing 之上的一个通用抽象层,提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 两个类,以便使用线程池 / 进程池并发地执行任务。

map 与 submit 方法

map 方法很简单,map 的结果和入参顺序是对应的。

future 模式更强大,用 as_completed 顺序是不定的。

import httpx
from parsel import Selector
from concurrent.futures import ThreadPoolExecutor, as_completed

urls = [f"https://news.cnblogs.com/n/page/{page}" for page in range(1, 10 + 1)]

def craw(url):
    r = httpx.get(url)
    return r.text

def parse(html):
    selector = Selector(html)
    title = selector.css(".content h2.news_entry a::text")[0].get()
    return title

with ThreadPoolExecutor() as pool:
    htmls = pool.map(craw, urls)
    htmls = list(zip(urls, htmls))
    for url, html in htmls:
        print(url, len(html))
print("craw over")

with ThreadPoolExecutor() as pool:
    futures = {}
    for url, html in htmls:
        future = pool.submit(parse, html)
        futures[future] = url
    for future in as_completed(futures):
        url = futures[future]
        print(url, future.result())
print("parse over")

Web 服务多线程加速

import flask
import json
import time
from concurrent.futures import ThreadPoolExecutor

app = flask.Flask(__name__)
pool = ThreadPoolExecutor()

def read_file():
    time.sleep(0.1)
    return "file rusult"

def read_db():
    time.sleep(0.2)
    return "db rusult"

def read_api():
    time.sleep(0.1)
    return "api rusult"

@app.route("/")
def index():
    result_file = pool.submit(read_file)
    result_db = pool.submit(read_db)
    result_api = pool.submit(read_api)
    return json.dumps(
        {"result_file": result_file.result(),
            "result_db": result_db.result(),
            "result_api": result_api.result(),}
    )

if __name__ == "__main__":
    app.run()

Web 服务多进程加速

from flask import Flask
import json
import math
from concurrent.futures import ProcessPoolExecutor

app = Flask(__name__)

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n, 2):
        if n % i == 0:
            return False

    return True

@app.route("/is-prime/<numbers>")
def api_is_prime(numbers: str):
    number_list = [int(x) for x in numbers.split(",")]
    results = process_pool.map(is_prime, number_list)
    return json.dumps(dict(zip(number_list, results)))

if __name__ == "__main__":
    process_pool = ProcessPoolExecutor()
    app.run()

正文完
 0
三毛笔记
版权声明:本站原创文章,由 三毛笔记 于2023-12-28发表,共计1983字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)